[Data][Flaky] Ensure ActorPoolMapOperator clears all queues on completion#58694
Conversation
There was a problem hiding this comment.
Code Review
This pull request addresses a flaky test by fixing a race condition in ActorPoolMapOperator. The change adds a call to _dispatch_tasks() in all_inputs_done() to ensure any queued bundles are processed when no more inputs are expected. This is a solid fix for the described problem. I've added one minor suggestion to clean up some redundant code in the same method for improved maintainability.
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
|
Hi @400Ping, thanks for your contribution! We really appreciate the effort. I think @owenowenisme might be right here, if (
output_buffer_size < self._min_rows_per_bundle
or output_buffer_size == 0
):it looks like we should be doing if (
output_buffer_size < self._min_rows_per_bundle
or output_buffer_size == 0
or self._finalized
):so that there are no more remainders. Can you try that and report back? |
|
Hey @400Ping, would you mind helping me understand the root cause of the assertion error? Also, did the repro script fail before the changes? |
Ok, will try to find it. |
| output_buffer_size += bundle_size | ||
| else: | ||
| remainder = self._bundle_buffer[idx:] | ||
| break |
There was a problem hiding this comment.
@bveeramani this feels like this break should have been there in the beginning, right?
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
|
cc @bveeramani PTAL |
|
not very sure if this is the way to solve this. |
| if ( | ||
| output_buffer_size < self._min_rows_per_bundle | ||
| or output_buffer_size == 0 | ||
| or self._finalized |
There was a problem hiding this comment.
@bveeramani my impression is that when self._finalized=True (ie, when an operator is completed()), it is possible this for loop enters the else statement down below, populating remainders with non-empty ref bundles.
I also think the break statement is necessary too, otherwise, remainders is always being reassigned.
|
@400Ping @iamjustinhsu I was able to create a minimal repro of this issue (at least, I think this is the same issue) import ray
class Fn:
def __call__(self, batch):
return batch
ds = ray.data.range(100, override_num_blocks=100).map_batches(Fn, batch_size=10).limit(1)
for _ in ds.iter_internal_ref_bundles():
pass@iamjustinhsu would a reasonable fix be to just clear the internal queues when the operator is manually marked finished? def mark_execution_finished(self):
# Discard remaining bundles in the internal bundle queue.
self._bundle_queue.clear()
# Discard remaining bundles in the block ref bundler.
self._block_ref_bundler.done_adding_bundles()
while self._block_ref_bundler.has_bundle():
self._block_ref_bundler.get_next_bundle()
super().mark_execution_finished() |
|
@bveeramani oh i see now. We actually do this for for all |
|
Ah, okay. @400Ping would you mind refactoring the PR or opening a new PR to do the following:
I think this is all that we need to do to solve the flaky issue. |
bveeramani
left a comment
There was a problem hiding this comment.
The implementations for clear_internal_input_queue and clear_internal_output_queue LGTM.
@400Ping to keep the git history clear, would you mind reverting all of the changes unrelated to those methods (e.g., formatting, removing _inputs_done, type annotations)? Once we've reverted the unrelated changes, I'll approve the PR
Ok. |
f47460e to
04c6b1d
Compare
Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
|
Thanks for the fix! |
|
@400Ping Just merged! ty for the contribution! |
Thank you as well, I am a newbie in this area 😓. |
ActorPoolMapOperator clears all queues on completion
…8694) ## Description The test fails intermittently with an assertion error indicating that the internal input queue for a MapBatches operator is not empty when it's expected to be. This suggests a race condition or timing issue in the streaming executor's queue management. ## Related issues Closes ray-project#58546 ## Additional information --------- Signed-off-by: 400Ping <fourhundredping@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu>
Description
One of the PyTorch examples intermittently fails with an error like this:
This assertion can fail because of two bugs:
done_adding_bundlesbefore clearing the block ref bundlerThis PR fixes those bugs and deflakes the test.
Related issues
Closes #58546
Additional information